package com.atguigu.Flink.wordCount;

import com.atguigu.Flink.POJO.WordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FLink05_PojoWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> ds = env.socketTextStream("hadoop102", 8888);
        SingleOutputStreamOperator<WordCount> flatMap = ds.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String s, Collector<WordCount> collector) throws Exception {
                String[] s1 = s.split(" ");
                for (String s2 : s1) {
                    collector.collect(new WordCount(s2,1L));
                }
            }
        });
        KeyedStream<WordCount, String> keyby = flatMap.keyBy(new KeySelector<WordCount, String>() {
            @Override
            public String getKey(WordCount wordCount) throws Exception {
                return wordCount.getWord();
            }
        });
        SingleOutputStreamOperator<WordCount> sum = keyby.sum("count");
        sum.print();
        env.execute();
    }
}
